{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---\n",
    "title: \"End-To-End Example with Scikit-Learn, Hops and the Feature Store - Iris Flower Classification\"\n",
    "date: 2021-02-24\n",
    "type: technical_note\n",
    "draft: false\n",
    "---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# End-To-End Example with Scikit-Learn, HSML and the Feature Store - Iris Flower Classification\n",
    "---\n",
    "*FEATURE ENGINEERING --> MODEL TRAINING --> MODEL SERVING --> MODEL MONITORING*\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this notebook we will, \n",
    "\n",
    "1. Load the Iris Flower dataset from HopsFS\n",
    "2. Do feature engineering on the dataset\n",
    "3. Save the features to the feature store\n",
    "4. Read the feature data from the feature store\n",
    "5. Train a KNN Model using SkLearn\n",
    "6. Save the trained model to HopsFS\n",
    "7. Launch a serving instance to serve the trained model\n",
    "8. Send some prediction requests to the served model\n",
    "9. Monitor the predictions through Kafka"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Model Serving on [Hopsworks](https://github.com/logicalclocks/hopsworks)\n",
    "\n",
    "![hops.png](../../images/hops.png)\n",
    "\n",
    "## The `hops` python library\n",
    "\n",
    "`hops` is a helper library for Hops that facilitates development by hiding the complexity of running applications and iteracting with services.\n",
    "\n",
    "Have a feature request or encountered an issue? Please let us know on <a href=\"https://github.com/logicalclocks/hops-util-py\">github</a>.\n",
    "\n",
    "### Using the `experiment` module\n",
    "\n",
    "To be able to run your Machine Learning code in Hopsworks, the code for the whole program needs to be provided and put inside a wrapper function. Everything, from importing libraries to reading data and defining the model and running the program needs to be put inside a wrapper function.\n",
    "\n",
    "The `experiment` module provides an api to Python programs such as TensorFlow, Keras and PyTorch on a Hopsworks on any number of machines and GPUs.\n",
    "\n",
    "An Experiment could be a single Python program, which we refer to as an **Experiment**. \n",
    "\n",
    "Grid search or genetic hyperparameter optimization such as differential evolution which runs several Experiments in parallel, which we refer to as **Parallel Experiment**. \n",
    "\n",
    "ParameterServerStrategy, CollectiveAllReduceStrategy and MultiworkerMirroredStrategy making multi-machine/multi-gpu training as simple as invoking a function for orchestration. This mode is referred to as **Distributed Training**.\n",
    "\n",
    "### Using the `hdfs` module\n",
    "The `hdfs` module provides a method to get the path in HopsFS where your data is stored, namely by calling `hdfs.project_path()`. The path resolves to the root path for your project, which is the view that you see when you click `Data Sets` in HopsWorks. To point where your actual data resides in the project you to append the full path from there to your Dataset. For example if you create a mnist folder in your Resources Dataset, the path to the mnist data would be `hdfs.project_path() + 'Resources/mnist'`\n",
    "\n",
    "```python\n",
    "# Use this module to get the path to your project in HopsFS, then append the path to your Dataset in your project\n",
    "from hops import hdfs\n",
    "project_path = hdfs.project_path()\n",
    "```\n",
    "\n",
    "```python\n",
    "from hops import hdfs\n",
    "# Uploading the traied model to hdfs\n",
    "hdfs.copy_to_hdfs(\"iris_knn.pkl\", \"Resources/iris_flower\", overwrite=True)\n",
    "\n",
    "# Downloading the iris flower model to the current working directory\n",
    "iris_flower_model_hdfs_path = hdfs.project_path() + \"Resources/iris_flower/iris_knn.pkl\"\n",
    "local_iris_flower_model_path = hdfs.copy_to_local(iris_flower_model_hdfs_path)\n",
    "```\n",
    "\n",
    "### Documentation\n",
    "See the following links to learn more about running experiments in Hopsworks\n",
    "\n",
    "- <a href=\"https://hopsworks.readthedocs.io/en/latest/hopsml/experiment.html\">Learn more about experiments</a>\n",
    "<br>\n",
    "- <a href=\"https://hopsworks.readthedocs.io/en/latest/hopsml/hopsML.html\">Building end-to-end pipelines</a>\n",
    "<br>\n",
    "- Give us a star, create an issue or a feature request on  <a href=\"https://github.com/logicalclocks/hopsworks\">Hopsworks github</a>\n",
    "\n",
    "### Managing experiments\n",
    "Experiments service provides a unified view of all the experiments run using the `experiment` module.\n",
    "<br>\n",
    "As demonstrated in the gif it provides general information about the experiment and the resulting metric. Experiments can be visualized meanwhile or after training in a TensorBoard.\n",
    "<br>\n",
    "<br>\n",
    "![Image7-Monitor.png](../../images/experiments.gif)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Import libraries"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting Spark application\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<table>\n",
       "<tr><th>ID</th><th>Application ID</th><th>Kind</th><th>State</th><th>Spark UI</th><th>Driver log</th></tr><tr><td>0</td><td>application_1652804119304_0001</td><td>pyspark</td><td>idle</td><td><a target=\"_blank\" href=\"/hopsworks-api/yarnui/https://hopsworks0.logicalclocks.com:8089/proxy/application_1652804119304_0001/\">Link</a></td><td><a target=\"_blank\" href=\"/hopsworks-api/yarnui/https://hopsworks0.logicalclocks.com:8044/node/containerlogs/container_1652804119304_0001_01_000001/demo_ml_meb10000__meb10000\">Link</a></td></tr></table>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "SparkSession available as 'spark'.\n"
     ]
    }
   ],
   "source": [
    "from sklearn.neighbors import KNeighborsClassifier\n",
    "from sklearn.metrics import accuracy_score\n",
    "import joblib\n",
    "from pyspark.ml.feature import StringIndexer\n",
    "from pyspark.sql.types import IntegerType\n",
    "import numpy as np\n",
    "import time\n",
    "import json\n",
    "from hops import kafka, hdfs, tls\n",
    "from confluent_kafka import Producer, Consumer, KafkaError\n",
    "import random"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Prepare Training Dataset\n",
    "\n",
    "### Load Iris Dataset (csv)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "project_path = hdfs.project_path()\n",
    "iris_df = spark.read.format(\"csv\").option(\"header\", \"true\").option(\"inferSchema\", True).load(\n",
    "    project_path + \"TourData/iris/iris.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- sepal_length: double (nullable = true)\n",
      " |-- sepal_width: double (nullable = true)\n",
      " |-- petal_length: double (nullable = true)\n",
      " |-- petal_width: double (nullable = true)\n",
      " |-- variety: string (nullable = true)"
     ]
    }
   ],
   "source": [
    "iris_df.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Feature  Engineering\n",
    "\n",
    "The dataset is already quite well prepared, the only thing we need to for feature engineering is to convert the `variety` column to numeric and save a lookup table so that we later on can convert the numeric representation back to the categorical representation."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- sepal_length: double (nullable = true)\n",
      " |-- sepal_width: double (nullable = true)\n",
      " |-- petal_length: double (nullable = true)\n",
      " |-- petal_width: double (nullable = true)\n",
      " |-- label: integer (nullable = true)"
     ]
    }
   ],
   "source": [
    "encoder = StringIndexer(inputCol=\"variety\", outputCol=\"label\")\n",
    "fit_model = encoder.fit(iris_df)\n",
    "iris_df1 = fit_model.transform(iris_df)\n",
    "lookup_df = iris_df1.select([\"variety\", \"label\"]).distinct()\n",
    "iris_df2 = iris_df1.drop(\"variety\")\n",
    "iris_df3 = iris_df2.withColumn(\"label\", iris_df2[\"label\"].cast(IntegerType()))\n",
    "iris_df3.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+-----------+------------+-----------+-----+\n",
      "|sepal_length|sepal_width|petal_length|petal_width|label|\n",
      "+------------+-----------+------------+-----------+-----+\n",
      "|         5.1|        3.5|         1.4|        0.2|    0|\n",
      "|         4.9|        3.0|         1.4|        0.2|    0|\n",
      "|         4.7|        3.2|         1.3|        0.2|    0|\n",
      "|         4.6|        3.1|         1.5|        0.2|    0|\n",
      "|         5.0|        3.6|         1.4|        0.2|    0|\n",
      "+------------+-----------+------------+-----------+-----+\n",
      "only showing top 5 rows"
     ]
    }
   ],
   "source": [
    "iris_df3.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----------+-----+\n",
      "|   variety|label|\n",
      "+----------+-----+\n",
      "| Virginica|  2.0|\n",
      "|Versicolor|  1.0|\n",
      "|    Setosa|  0.0|\n",
      "+----------+-----+"
     ]
    }
   ],
   "source": [
    "lookup_df.show(3)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Save Features to the Feature Store\n",
    "\n",
    "We can save two feature groups (hive tables), one called `iris_features` that contains the iris features and the corresponding numeric label, and another feature group called `iris_labels_lookup` for converting the numeric iris label back to categorical.\n",
    "\n",
    "**Note**: To be able to run the feature store code, you first have to enable the Feature Store Service in your project. To do this, go to the \"Settings\" tab in your project, select the feature store service and click \"Save\". "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Connected. Call `.close()` to terminate connection gracefully."
     ]
    }
   ],
   "source": [
    "import hsfs\n",
    "# Create a connection\n",
    "connection = hsfs.connection()\n",
    "# Get the feature store handle for the project's feature store\n",
    "fs = connection.get_feature_store()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "iris_features = fs.create_feature_group(name=\"iris_features\", version=1, time_travel_format=None)\n",
    "iris_features.save(iris_df3)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "iris_labels_lookup = fs.create_feature_group(name=\"iris_labels_lookup\", version=1, time_travel_format=None)\n",
    "iris_labels_lookup.save(lookup_df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Read the Iris Training Dataset from the Feature Store"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "   sepal_length  sepal_width  petal_length  petal_width  label\n",
      "0           5.1          3.5           1.4          0.2      0\n",
      "1           4.9          3.0           1.4          0.2      0\n",
      "2           4.7          3.2           1.3          0.2      0\n",
      "3           4.6          3.1           1.5          0.2      0\n",
      "4           5.0          3.6           1.4          0.2      0\n",
      "5           5.4          3.9           1.7          0.4      0\n",
      "6           4.6          3.4           1.4          0.3      0\n",
      "7           5.0          3.4           1.5          0.2      0\n",
      "8           4.4          2.9           1.4          0.2      0\n",
      "9           4.9          3.1           1.5          0.1      0"
     ]
    }
   ],
   "source": [
    "iris_features = fs.get_feature_group(\"iris_features\", 1)\n",
    "df = iris_features.read().toPandas()\n",
    "df.head(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "       sepal_length  sepal_width  petal_length  petal_width       label\n",
      "count    150.000000   150.000000    150.000000   150.000000  150.000000\n",
      "mean       5.843333     3.057333      3.758000     1.199333    1.000000\n",
      "std        0.828066     0.435866      1.765298     0.762238    0.819232\n",
      "min        4.300000     2.000000      1.000000     0.100000    0.000000\n",
      "25%        5.100000     2.800000      1.600000     0.300000    0.000000\n",
      "50%        5.800000     3.000000      4.350000     1.300000    1.000000\n",
      "75%        6.400000     3.300000      5.100000     1.800000    2.000000\n",
      "max        7.900000     4.400000      6.900000     2.500000    2.000000"
     ]
    }
   ],
   "source": [
    "df.describe()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "x_df = df[['sepal_length', 'sepal_width', 'petal_length', 'petal_width']]\n",
    "y_df = df[[\"label\"]]\n",
    "X = x_df.values\n",
    "y = y_df.values.ravel()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Train an Iris Flower Classifier"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "IRIS_FLOWER_RESOURCES_PATH = \"/Resources/iris_flower\"\n",
    "IRIS_FLOWER_MODEL_PKL = \"iris_knn.pkl\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Generate a KNN Model using the Feature Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0.98"
     ]
    }
   ],
   "source": [
    "neighbors = random.randint(3, 30)\n",
    "iris_knn = KNeighborsClassifier(n_neighbors=neighbors)\n",
    "iris_knn.fit(X, y)\n",
    "y_pred = iris_knn.predict(X)\n",
    "acc = accuracy_score(y, y_pred)\n",
    "print(acc)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Save the Trained Model to HopsFS"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "joblib.dump(iris_knn, IRIS_FLOWER_MODEL_PKL)\n",
    "hdfs.mkdir(IRIS_FLOWER_RESOURCES_PATH)\n",
    "hdfs.copy_to_hdfs(IRIS_FLOWER_MODEL_PKL, IRIS_FLOWER_RESOURCES_PATH, overwrite=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Export the Trained Model to the Models Repository"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "It is a best-practice to put the python script that loads and run the model together with the trained model files. Below is the code for copying the script into the iris flower resources folder, which will be used when exporting the model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "PREDICTOR_SCRIPT_NAME = \"iris_flower_classifier.py\"\n",
    "PREDICTOR_SCRIPT_PATH = \"Jupyter/end_to_end_pipeline/sklearn/\" + PREDICTOR_SCRIPT_NAME\n",
    "\n",
    "hdfs.cp(PREDICTOR_SCRIPT_PATH, IRIS_FLOWER_RESOURCES_PATH + \"/\" + PREDICTOR_SCRIPT_NAME, overwrite=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Connected. Call `.close()` to terminate connection gracefully.\n",
      "Exported model irisflowerclassifier with version 1\n",
      "Model('irisflowerclassifier', 1, 'SKLEARN')\n",
      "Model export complete: 100%|##########| 6/6 [00:13<00:00,  2.33s/it]                   "
     ]
    }
   ],
   "source": [
    "import hsml\n",
    "from hsml.schema import Schema\n",
    "from hsml.model_schema import ModelSchema\n",
    "\n",
    "MODEL_NAME = \"irisflowerclassifier\"\n",
    "\n",
    "input_schema = Schema(x_df)\n",
    "output_schema = Schema(y_df)\n",
    "\n",
    "conn = hsml.connection()\n",
    "mr = conn.get_model_registry()\n",
    "\n",
    "tf_model = mr.sklearn.create_model(MODEL_NAME, \n",
    "                                   metrics={'accuracy': acc},\n",
    "                                   input_example=x_df,\n",
    "                                   model_schema=ModelSchema(input_schema=input_schema, output_schema=output_schema))\n",
    "\n",
    "tf_model.save(IRIS_FLOWER_RESOURCES_PATH)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1\n",
      "hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/input_example.json\n",
      "hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_flower_classifier.py\n",
      "hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/iris_knn.pkl\n",
      "hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/model_schema.json\n",
      "hdfs://rpc.namenode.service.consul:8020/Projects/demo_ml_meb10000/Models/irisflowerclassifier/1/program.ipynb"
     ]
    }
   ],
   "source": [
    "for p in hdfs.ls(\"Models/\" + MODEL_NAME, recursive=True):\n",
    "    print(p)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Serve the Trained Model\n",
    "\n",
    "To serve a SkLearn Model, write a python script that downloads the HDFS model in the constructor and saves it as a class variable and then implements the `Predict` class and the methods `predict`, `classify` and `regress`, like this:\n",
    "\n",
    "```python\n",
    "from sklearn.externals import joblib\n",
    "from hops import hdfs\n",
    "\n",
    "class Predict(object):\n",
    "\n",
    "    def __init__(self):\n",
    "        \"\"\"Prepare and load a trained model\"\"\"\n",
    "        self.model_path = \"Models/irisflowerclassifier/1/iris_knn.pkl\"\n",
    "        print(\"Copying scikit-learn model from HDFS to local directory\")\n",
    "        hdfs.copy_to_local(self.model_path)\n",
    "        print(\"Loading local scikit-learn model\")\n",
    "        self.model = joblib.load(\"./iris_knn.pkl\")\n",
    "        print(\"Trained model is ready\")\n",
    "\n",
    "    def predict(self, inputs):\n",
    "        \"\"\"Serves prediction using a trained model\"\"\"\n",
    "        return self.model.predict(inputs).tolist() # Numpy Arrays are note JSON serializable\n",
    "```\n",
    "\n",
    "Then upload this python script to some folder in your project and go to the \"Model Serving\" service in Hopsworks:\n",
    "\n",
    "![sklearn_serving1.png](./../../images/sklearn_serving1.png)\n",
    "\n",
    "Then click on \"create serving\" and configure your serving:\n",
    "\n",
    "![sklearn_serving2.png](./../../images/sklearn_serving2.png)\n",
    "\n",
    "Once the serving is created, you can start it and view information like server-logs and which kafka topic it is logging inference requests to.\n",
    "\n",
    "![sklearn_serving3.png](./../../images/sklearn_serving3.png)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Query Model Repository for best IrisFlowerClassifier Model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Model name: irisflowerclassifier\n",
      "Model version: 1\n",
      "{'accuracy': '0.98'}"
     ]
    }
   ],
   "source": [
    "# Get best version of the iris flower classifier model\n",
    "best_model = mr.get_best_model(MODEL_NAME, \"accuracy\", \"max\")\n",
    "\n",
    "print('Model name: ' + best_model.name)\n",
    "print('Model version: ' + str(best_model.version))\n",
    "print(best_model.training_metrics)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create a Deployment for the Trained Model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "PREDICTOR_SCRIPT_PATH = best_model.version_path + \"/\" + PREDICTOR_SCRIPT_NAME\n",
    "\n",
    "# Deploy the best version of the model\n",
    "irisclassifier = best_model.deploy(script_file=PREDICTOR_SCRIPT_PATH)  # To serve models with KServe, set serving_tool to \"KSERVE\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "After the serving have been created, you can find it in the Hopsworks UI by going to the \"Deployments\" tab. You can also use the class attributes or the `.describe()` method of a deployment object to describe access its metadata."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Description of irisflowerclassifier\n",
      "{\n",
      "    \"artifact_version\": 1,\n",
      "    \"batching_enabled\": false,\n",
      "    \"created\": \"2022-05-18T12:40:06.464Z\",\n",
      "    \"creator\": \"Admin Admin\",\n",
      "    \"id\": 1,\n",
      "    \"inference_logging\": \"ALL\",\n",
      "    \"kafka_topic_dto\": {\n",
      "        \"name\": \"CREATE\",\n",
      "        \"num_of_partitions\": 1,\n",
      "        \"num_of_replicas\": 1\n",
      "    },\n",
      "    \"model_name\": \"irisflowerclassifier\",\n",
      "    \"model_path\": \"/Projects/demo_ml_meb10000/Models/irisflowerclassifier\",\n",
      "    \"model_server\": \"PYTHON\",\n",
      "    \"model_version\": 1,\n",
      "    \"name\": \"irisflowerclassifier\",\n",
      "    \"predictor\": \"iris_flower_classifier.py\",\n",
      "    \"predictor_resource_config\": {\n",
      "        \"cores\": 1,\n",
      "        \"gpus\": 0,\n",
      "        \"memory\": 1024\n",
      "    },\n",
      "    \"requested_instances\": 1,\n",
      "    \"serving_tool\": \"DEFAULT\"\n",
      "}"
     ]
    }
   ],
   "source": [
    "print(\"Description of \" + irisclassifier.name)\n",
    "irisclassifier.describe()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Classify Iris Flowers using the Trained Model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Deployment is running: 100%|##########| 1/1 [00:05<00:00,  5.49s/it]"
     ]
    }
   ],
   "source": [
    "irisclassifier.start()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Send Prediction Requests to the Deployed Model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For making inference requests you can use the utility method `.predict()` from the deployment object."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}\n",
      "{'predictions': [0]}"
     ]
    }
   ],
   "source": [
    "for i in range(10):\n",
    "    data = {\"instances\" : [best_model.input_example]}\n",
    "    predictions = irisclassifier.predict(data)\n",
    "    print(predictions)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Monitor Prediction Logs\n",
    "\n",
    "### Consume Prediction Requests and Responses using Kafka\n",
    "\n",
    "All prediction requestst are automatically logged to Kafka which means that you can keep track for your model's performance and its predictions in a scalable manner."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Setup a Kafka consumer and subscribe to the topic containing the prediction logs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [],
   "source": [
    "TOPIC_NAME = irisclassifier.inference_logger.kafka_topic.name\n",
    "\n",
    "config = kafka.get_kafka_default_config()\n",
    "config['default.topic.config'] = {'auto.offset.reset': 'earliest'}\n",
    "consumer = Consumer(config)\n",
    "topics = [TOPIC_NAME]\n",
    "consumer.subscribe(topics)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Read the Kafka Avro schema from Hopsworks and setup an Avro reader"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [],
   "source": [
    "json_schema = kafka.get_schema(TOPIC_NAME)\n",
    "avro_schema = kafka.convert_json_schema_to_avro(json_schema)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Read the lookup table from the Feature Store for converting numerical labels to categorical"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [],
   "source": [
    "iris_labels_lookup = fs.get_feature_group(\"iris_labels_lookup\", 1)\n",
    "iris_labels_lookup_df = iris_labels_lookup.read().toPandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Read messages from the Kafka topic, parse them with the Avro schema and print the results"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "serving: irisflowerclassifier, version: 1, timestamp: 1652878029777,\n",
      "         http_response_code: 200, model_server: PYTHON, serving_tool: DEFAULT\n",
      "predictions: 0, prediction_label:Setosa\n",
      "\n",
      "serving: irisflowerclassifier, version: 1, timestamp: 1652878030212,\n",
      "         http_response_code: 200, model_server: PYTHON, serving_tool: DEFAULT\n",
      "predictions: 0, prediction_label:Setosa\n",
      "\n",
      "serving: irisflowerclassifier, version: 1, timestamp: 1652878031009,\n",
      "         http_response_code: 200, model_server: PYTHON, serving_tool: DEFAULT\n",
      "predictions: 0, prediction_label:Setosa\n",
      "\n",
      "serving: irisflowerclassifier, version: 1, timestamp: 1652878031725,\n",
      "         http_response_code: 200, model_server: PYTHON, serving_tool: DEFAULT\n",
      "predictions: 0, prediction_label:Setosa\n",
      "\n",
      "serving: irisflowerclassifier, version: 1, timestamp: 1652878032543,\n",
      "         http_response_code: 200, model_server: PYTHON, serving_tool: DEFAULT\n",
      "predictions: 0, prediction_label:Setosa"
     ]
    }
   ],
   "source": [
    "PRINT_INSTANCES=False\n",
    "PRINT_PREDICTIONS=True\n",
    "\n",
    "for i in range(0, 5):\n",
    "    msg = consumer.poll(timeout=5.0)\n",
    "    if msg is not None:\n",
    "        value = msg.value()\n",
    "        try:\n",
    "            event_dict = kafka.parse_avro_msg(value, avro_schema)\n",
    "            \n",
    "            print(\"serving: {}, version: {}, timestamp: {},\\n\"\\\n",
    "                  \"         http_response_code: {}, model_server: {}, serving_tool: {}\".format(\n",
    "                       event_dict[\"modelName\"],\n",
    "                       event_dict[\"modelVersion\"],\n",
    "                       event_dict[\"requestTimestamp\"],\n",
    "                       event_dict[\"responseHttpCode\"],\n",
    "                       event_dict[\"modelServer\"],\n",
    "                       event_dict[\"servingTool\"]))\n",
    "            \n",
    "            if PRINT_INSTANCES:\n",
    "                print(\"instances: {}\\n\".format(event_dict[\"inferenceRequest\"]))\n",
    "            if PRINT_PREDICTIONS:\n",
    "                prediction = json.loads(event_dict[\"inferenceResponse\"])[\"predictions\"][0]\n",
    "                prediction_label = iris_labels_lookup_df.loc[iris_labels_lookup_df['label'] == prediction, 'variety'].iloc[0]\n",
    "                print(\"predictions: {}, prediction_label:{}\\n\".format(prediction, prediction_label))\n",
    "\n",
    "        except Exception as e:\n",
    "            print(\"A message was read but there was an error parsing it\")\n",
    "            print(e)\n",
    "    else:\n",
    "        print(\"timeout.. no more messages to read from topic\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "python",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 3
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
